package com.imis.module.websocket;

import com.alibaba.fastjson.JSONObject;
import com.imis.base.constant.CacheConstant;
import com.imis.base.constant.CommonConstant;
import com.imis.base.constant.enums.ArgumentResponseEnum;
import com.imis.base.util.RedisUtil;
import com.imis.module.system.bus.SysWebsocketMessageBus;
import com.imis.module.system.model.ro.SysWebsocketMessageAddRO;
import com.imis.module.websocket.dto.WebSocketMessageDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * <p>
 * WebSocket<br>
 * 前后端交互的类实现消息的接收推送<br>
 * 使用@ServerEndpoint(javax.websocket.server.ServerEndpoint)注解，表示当前类是 webSocket 服务终端，同时在里面实现客户端连接建立、发送消息、接收消息等通信业务；此注解相当于设置访问URL<br>
 * 使用@Component，表示当前类将此组件交由 Spring 管理<br>
 * </p>
 *
 * @author XinLau
 * @version 1.0
 * @since 2020年07月21日 10:29
 */
@Slf4j
@ServerEndpoint(value = "/websocket/{userIdentification}")
@Component
public class WebSocket {

    /**
     * WebSocket信息传递记录 业务处理类
     */
    private SysWebsocketMessageBus sysWebsocketMessageBus;

    @Autowired
    public void setSysWebsocketMessageBus(SysWebsocketMessageBus sysWebsocketMessageBus) {
        this.sysWebsocketMessageBus = sysWebsocketMessageBus;
    }

    /**
     * 存储所有在线客户端的Session
     * key:userIdentification;
     * value:session
     */
    private static final Map<String, Session> CLIENTS = new ConcurrentHashMap<>();

    /**
     * Redis 工具类
     */
    protected RedisUtil redisUtil;

    @Autowired
    public void setRedisUtil(RedisUtil redisUtil) {
        this.redisUtil = redisUtil;
    }

    /**
     * 递增并获取
     *
     * @return Double - 递增后数量
     * @author XinLau
     * @creed The only constant is change ! ! !
     * @since 2020/7/24 11:01
     */
    private Double incrementAndGet() {
        return redisUtil.hincr(CacheConstant.SESSION_ID_CLIENT_ID, CacheConstant.SESSION_ONLINE_COUNT, 1);
    }

    /**
     * 递减并获取
     *
     * @return Double - 递减后数量
     * @author XinLau
     * @creed The only constant is change ! ! !
     * @since 2020/7/24 11:01
     */
    private Double decrementAndGet() {
        return redisUtil.hdecr(CacheConstant.SESSION_ID_CLIENT_ID, CacheConstant.SESSION_ONLINE_COUNT, 1);
    }

    /**
     * 服务端发送消息给客户端
     *
     * @param message         - 服务端发送消息
     * @param receiverSession - Session 接收方Session
     * @return null -
     * @author XinLau
     * @creed The only constant is change ! ! !
     * @since 2020/7/24 10:29
     */
    private Boolean sendMessage(final String message, final Session receiverSession) {
        try {
            log.info("服务端给客户端[{}]发送消息[{}]", receiverSession.getId(), message);
            receiverSession.getBasicRemote().sendText(message);
            return Boolean.TRUE;
        } catch (Exception e) {
            log.error("服务端发送消息给客户端失败：{}", e.getMessage());
            return Boolean.FALSE;
        }
    }

    /**
     * 连接建立成功调用的方法
     *
     * @param session            - Session
     * @param userIdentification - 用户标识
     * @author XinLau
     * @creed The only constant is change ! ! !
     * @since 2020/7/21 15:53
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userIdentification") String userIdentification) {
        // 存储用户标识与用户Session
        CLIENTS.put(userIdentification, session);
        // 存储用户Session标识与用户标识
        redisUtil.hset(CacheConstant.SESSION_ID_CLIENT_ID, session.getId(), userIdentification);
        // 在线数加 1
        Double onlineCount = incrementAndGet();
        log.info("有新连接加入：{}，当前在线人数为：{}", session.getId(), onlineCount);
        // redis 内注册自己
    }

    /**
     * 连接关闭调用的方法
     *
     * @param session - Session
     * @author XinLau
     * @creed The only constant is change ! ! !
     * @since 2020/7/21 15:53
     */
    @OnClose
    public void onClose(Session session) {
        // 根据Session标识取出用户表示
        String userIdentification = redisUtil.hget(CacheConstant.SESSION_ID_CLIENT_ID, session.getId(), String.class);
        // 根据用户标识剔除用户Session
        CLIENTS.remove(userIdentification);
        // 清除Session标识
        redisUtil.del(CacheConstant.SESSION_ID_CLIENT_ID, session.getId());
        // 在线数减 1
        Double onlineCount = decrementAndGet();
        log.info("有一连接关闭：{}，当前在线人数为：{}", session.getId(), onlineCount);
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message       - 客户端发送过来的消息
     * @param senderSession - Session 发送者的Session
     * @return null -
     * @author XinLau
     * @creed The only constant is change ! ! !
     * @since 2020/7/24 10:26
     */
    @OnMessage
    public void onMessage(String message, Session senderSession) {
        log.info("服务端收到客户端[{}]的消息[{}]", senderSession.getId(), message);
        WebSocketMessageDTO webSocketMessage = JSONObject.parseObject(message, WebSocketMessageDTO.class);
        sendMassages(webSocketMessage);
        this.sendMessage("已发出", senderSession);
    }

    /**
     * 异常
     *
     * @param session - Session
     * @param error   - Throwable
     * @return null -
     * @author XinLau
     * @creed The only constant is change ! ! !
     * @since 2020/7/24 10:27
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("{}发生错误", session.getId());
        error.printStackTrace();
    }

    /**
     * 根据用户标识获取Session
     *
     * @param userIdentification - 用户标识
     * @return Session -
     * @author XinLau
     * @creed The only constant is change ! ! !
     * @since 2020/7/24 18:09
     */
    public Session getSessionByUserIdentification(final String userIdentification) {
        return CLIENTS.get(userIdentification);
    }

    /**
     * 发送WebsocketMessages信息
     *
     * @param webSocketMessage - WebSocket信息对象
     * @return Boolean -
     * @author XinLau
     * @creed The only constant is change ! ! !
     * @since 2020/7/24 18:26
     */
    public Boolean sendMassages(WebSocketMessageDTO webSocketMessage) {
        List<SysWebsocketMessageAddRO> websocketMessageAddArrayList = new ArrayList<>();
        try {
            // 解析客户端发送过来的消息
            if (webSocketMessage != null) {
                for (String receiverIdentification : webSocketMessage.getReceiverIdentificationArray()) {
                    SysWebsocketMessageAddRO websocketMessageAdd = new SysWebsocketMessageAddRO();
                    // 信息内容
                    String messageContent = webSocketMessage.getMessage().toJSONString();
                    websocketMessageAdd.setContent(messageContent);
                    // 发送方
                    websocketMessageAdd.setSender(webSocketMessage.getSenderIdentification());
                    // 接收方
                    websocketMessageAdd.setReceiver(receiverIdentification);
                    // 获取收信端Session
                    Session receiveSession = getSessionByUserIdentification(receiverIdentification);
                    // 信息状态（0：发送成功，1：等待接收，2：发送异常）
                    Integer messageState = CommonConstant.MESSAGE_STATE_SEND_EXCEPTION;
                    if (receiveSession != null) {
                        boolean sendMessage = this.sendMessage(messageContent, receiveSession);
                        if (sendMessage) {
                            // 发送成功
                            messageState = CommonConstant.MESSAGE_STATE_SENT_SUCCESSFULLY;
                        }
                    } else {
                        // 等待接收：未找到接收方Session，认为接收方不在线
                        messageState = CommonConstant.MESSAGE_STATE_WAITING_TO_RECEIVE;
                    }
                    websocketMessageAdd.setMessageState(messageState);
                    websocketMessageAddArrayList.add(websocketMessageAdd);
                }
            }
        } catch (Exception e) {
            log.error(ArgumentResponseEnum.WEBSOCKET_PAGE_QUERY_ERR.getMessage(), e);
            return Boolean.FALSE;
        }
        // 批量保存WebsocketMessages记录
        return sysWebsocketMessageBus.batchAddWebsocketMessages(websocketMessageAddArrayList);
    }

}
